feat: preserve blob data through Spark shuffle during JOIN + INSERT INTO#355
Conversation
## Summary Add support for writing blob v2 columns with external URI references that are outside registered base paths. This enables use cases like INSERT INTO SELECT across Lance tables where the target table stores external blob references pointing to the source table's blob files instead of copying the actual blob bytes. ## Changes - **WriteParams.java**: Add `allowExternalBlobOutsideBases` Optional<Boolean> field, getter, and builder method - **Fragment.java**: Pass the new field through `createWithFfiArray` and `createWithFfiStream` native methods - **fragment.rs (JNI)**: Thread the new `Optional<Boolean>` parameter through all fragment creation functions to `extract_write_params` - **utils.rs (JNI)**: Parse the new parameter and set `allow_external_blob_outside_bases` on Rust `WriteParams` - **blocking_dataset.rs (JNI)**: Pass `JObject::null()` for the new param in `Dataset.write()` path (not needed there) ## Context This is a prerequisite for lance-spark blob JOIN support (lance-format/lance-spark#355). When blob data flows through Spark's shuffle during JOIN + INSERT INTO, the target table needs to write external blob references pointing to the source table's physical blob files. The Rust `BlobPreprocessor` already supports this via `allow_external_blob_outside_bases`, but the Java SDK had no way to set it. Ref: #6321, #6322 ## Test plan - [x] Rust JNI code compiles cleanly (no errors in changed files) - [ ] Java unit tests (CI) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
d3a606c to
9fabe62
Compare
hamersaw
left a comment
There was a problem hiding this comment.
Thanks for the PR! Should be a pretty significant help in reducing memory utilization. A few comments to iron things out, but I think I need to dive through the regular read paths to make sure this doesn't negatively affect performance there as well.
|
Diving through this a bit more. I'm not sure there's a reason to have a |
|
Good point — collapsed BlobResolvingLargeBinaryWriter into LargeBinaryWriter so there is now a single writer that always resolves blob references via resolveIfNeeded(). This means blob references are correctly resolved regardless of whether the target column is blob-encoded or regular binary, preventing silent data corruption. |
|
@hamersaw All feedback has been addressed and CI is fully green. Ready for another look when you get a chance! |
hamersaw
left a comment
There was a problem hiding this comment.
Thanks for the iteration here, I think we're getting close!
|
@hamersaw Addressed all four comments from the second review — CI is green again. Would appreciate another look when you have a moment! |
hamersaw
left a comment
There was a problem hiding this comment.
Looks great! Lets resolve merge conflicts and then I'll get it in.
When blob columns flow through Spark's shuffle (e.g., INSERT INTO target SELECT ... FROM source_a JOIN source_b), the actual blob data was previously lost. This PR introduces a blob reference mechanism that preserves blob data through shuffle without materializing the full blob bytes. Read side: blob columns serialize compact ~100-byte BlobReference descriptors (LANCEREF magic + dataset URI + column name + row address) instead of empty bytes. The scanner requests _rowaddr when blob columns are present and strips it from the output. Write side: LargeBinaryWriter detects BlobReference headers, buffers them during setValue(), then batch-resolves all references in finish() via a single takeBlobs() call per (dataset, column) group. Dataset instances are cached across batches for the task lifetime. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
75d49e6 to
2cd8627
Compare
Addresses review feedback on the blob JOIN/INSERT preservation path and makes vended-credential auto-refresh work for blob sources. Correctness: - LargeBinaryWriter now buffers all per-row values and emits the vector in a single ascending pass at finish(), instead of back-filling resolved blobs out of order (which corrupts a LargeVarBinaryVector's offset buffer). resolveBatch() returns an index->bytes map rather than writing into the vector. Resource lifecycle: - The BlobReferenceResolver is now created once per write task, shared across batches/fragments, and closed at LanceDataWriter teardown — fixing a leak of native source datasets (one per blob column per batch). Credentials (the main design change): - New LanceBlobSourceContextRule optimizer rule collects each blob source table's BlobSourceContext (read options + namespace config) on the driver and stashes them, keyed by source URI, in the write command's options. LanceDataset.newWriteBuilder decodes them and threads them to the per-task resolver, which reopens sources via Utils.openDatasetBuilder().runtimeNamespace(...) so vended credentials keep auto-refreshing — mirroring compaction/index. No global registry, no per-row shuffle bloat. Registered in LanceSparkSessionExtensions. - BlobReferenceResolver no longer opens datasets directly; falls back to open-by-URI when no context is present (local sources / extension off). Performance: - BlobStructAccessor precomputes the constant reference prefix once per batch in setBlobReferenceContext; the per-row path only appends the 8-byte rowAddress. Misc: - Extract LargeBinaryWriter to its own file. - Fold the new constructor arguments into single canonical constructors rather than parallel overloads; update affected tests. - BaseBlobJoinTest enables the SQL extension so the rule path is covered. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR review feedback on the blob join-preservation path: - BlobReferenceResolver.resolveBatch: deduplicate row addresses within each (datasetUri, columnName) group and fan resolved bytes back out by address instead of by list position. Guard against takeBlobs count mismatch and null elements by failing loudly rather than writing wrong bytes. Document the takeBlobs ordering/1:1 contract. - BlobStructAccessor.getBlobReference: test blob size with the primitive UInt8Vector accessor instead of boxing through a per-row BigInteger on the scan hot path. - LargeBinaryWriter: buffer lazily — write rows directly until the first blob reference, then buffer only the tail. The common non-blob binary case now buffers nothing. Tests (written as JUnit 5 in Scala so surefire actually executes them; the existing ScalaTest *Suite classes are not picked up by the build): - LargeBinaryWriterTest: direct/buffered ordering, reference resolution at correct indices, IOException->RuntimeException, reset. - LanceBlobSourceContextRuleTest: non-Lance target no-op, blob-free source no-op, positive annotation, existing-key guard, idempotence. - BaseBlobJoinTest: one-to-many JOIN test exercising resolver dedup. Verified across Spark 3.5 and 4.1 (Scala 2.13). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Blob references flow through the shuffle as ~200-byte placeholders, so the per-batch byte guard sized the references, not the blobs they resolve to. The batch capped at 8192 rows; at finish() those references resolved to full blob bytes all at once (resolved map + vector copy), reliably OOMing an executor at the feature's target scale with maxBatchBytes providing no protection. Carry the resolved blob size (already known from the source size vector at read time) in BlobReference, and feed an exact buffered-bytes estimate into both the semaphore and queued write buffers' byte budgets. A batch now flushes before its resolved blobs exceed maxBatchBytes, bounding the Arrow vector; peak transient at resolution stays ~2x maxBatchBytes. - BlobReference: wire format v2 appends 8-byte size - BlobStructAccessor: stamp real size onto each emitted reference - LargeBinaryWriter: track pendingBytes, expose estimatedBufferedBytes - LanceArrowWriter + field-writer base + container writers: propagate estimate - Semaphore/Queued buffers: add estimate to per-batch byte total Tests: size round-trip, estimatedBufferedBytes accounting/reset, and an end-to-end buffer test proving references trip the byte budget. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@beinan pushed a few commits here that:
|
|
@hamersaw Thanks for the approval! Could you merge this when you get a chance? |
Summary
INSERT INTO target SELECT ... FROM source_a JOIN source_b), the actual blob data was previously lost becausegetBinary()returned empty byte arrays (new byte[0]). The target table ended up with zero-length blobs.How it works
Read side — compact blob references flow through shuffle:
byte[0], blob columns now serialize compact ~100-byteBlobReferencedescriptors containing the source dataset URI, column name, and row address_rowaddrwhen blob columns are present, and strips it from the output after extracting the row addressesWrite side — blob references are resolved to actual bytes:
BlobResolvingLargeBinaryWriterdetectsBlobReferencemagic headers in incoming binary valuesDataset.takeBlobs()to fetch the actual blob bytesKey files
BlobReference.javaLBRFmagic + dataset URI + column name + row address)BlobReferenceResolver.javatakeBlobs()→ returns actual bytesBaseBlobJoinTest.javaBlobJoinTest.java(3.4 + 3.5)BlobStructAccessor.javagetBlobReference()LanceArrowColumnVector.javagetBinary()returns blob reference instead ofbyte[0]LanceFragmentScanner.java_rowaddr, exposes dataset URI and blob column namesLanceFragmentColumnarBatchScanner.java_rowaddr, sets blob context onBlobStructAccessor, strips implicit_rowaddrLanceArrowWriter.scalaBlobResolvingLargeBinaryWriterfor blob-encoded columnsBaseBlobCreateTableTest.javaTest plan
BlobJoinTest.testBlobPreservedDuringInsertIntoSelect— SimpleINSERT INTO target SELECT FROM sourcepreserves blob dataBlobJoinTest.testBlobPreservedDuringJoinAndInsert—JOINof two blob tables preserves both blob columns in targetBlobJoinTest.testNonBlobColumnsPreservedDuringJoinWithBlobs— Non-blob columns survive JOIN+INSERT alongside blob columnsBlobCreateTableTesttests pass (9/9)🤖 Generated with Claude Code